PostgresHook: Add upsert rows support using ON CONFLICT#67045
PostgresHook: Add upsert rows support using ON CONFLICT#67045SameerMesiah97 wants to merge 3 commits into
Conversation
…sycopg3. Consolidate duplicated insert/upsert and dialect tests into a shared base class while preserving version-specific behavior and lineage coverage.
…oduce upsert_rows and UPSERT SQL generation with support for DO UPDATE, DO NOTHING, chunked commits, and execute_batch optimization. Added unit tests for UPSERT SQL generation, DO UPDATE and DO NOTHING behavior, chunked execution, execute_batch execution, validation checks, and transaction handling.
64de06d to
5709c06
Compare
justinpakzad
left a comment
There was a problem hiding this comment.
Nice PR. Left a couple of comments. I know it's still in draft but figured I'd leave some feedback anyways.
| target_fields: list[str], | ||
| conflict_fields: list[str], | ||
| update_fields: list[str] | None = None, | ||
| **kwargs, |
There was a problem hiding this comment.
I don't think we need the kwargs here as there is nothing consuming them.
| def _generate_upsert_sql( | ||
| self, | ||
| table: str, | ||
| values: tuple[Any, ...] | list[Any], |
There was a problem hiding this comment.
Do we need to pass in the values here? The only thing it's used for is to produce the right number of placeholders but I think that can just be done with len(target_fields).
| sql = self._generate_upsert_sql( | ||
| table=table, | ||
| values=values[0], | ||
| target_fields=target_fields, | ||
| conflict_fields=conflict_fields, | ||
| update_fields=update_fields, | ||
| ) |
There was a problem hiding this comment.
Related to my comment above - if we don't need to pass values since it's just used for the number of placeholders, I don't think we need to regenerate the same SQL string on every chunk. This can just be done once outside of the loop.
| if fast_executemany: | ||
| # execute_batch reduces round trips by batching parameter sets. | ||
| execute_batch( |
There was a problem hiding this comment.
Do we need a guard here since psycopg3 does not support execute_batch? Maybe using the USE_PSYCOPG3 constant that's used in other parts of the code. Either logging a warning and defaulting back to cur.executemany or raising an error.
| table, | ||
| ) | ||
|
|
||
| if sql: |
There was a problem hiding this comment.
Also related to my comment above, if we construct the query once outside the loop then this would need to be updated. This could be if nb_rows > 0.
That’s perfectly fine. I will address the feedback once the dependency is merged. |
Description
This change adds a new
PostgresHook.upsert_rowsmethod that provides native PostgreSQL UPSERT support usingINSERT ... ON CONFLICT.The new method supports configurable conflict targets through
conflict_fieldsand selective updates throughupdate_fields. Whenupdate_fieldsis omitted or empty, conflicting rows are ignored usingDO NOTHING.upsert_rowsreuses the existing batching, transaction handling, serialization, and lineage behavior used byinsert_rows, while introducing PostgreSQL-specific UPSERT semantics that are not currently exposed through the generic insert abstraction.This PR is dependent on PR #66893 merging first.
Rationale
DbApiHook.insert_rowscurrently supports a genericreplace=Trueabstraction delegated through dialect-specific SQL generation. However, PostgreSQL UPSERT semantics require additional concepts that are not representable through the existing API, including explicit conflict targets and selective update columns.Supporting PostgreSQL-native UPSERT behavior through
insert_rowswould require introducing PostgreSQL-specific arguments such asconflict_fieldsandupdate_fieldsinto the shared publicDbApiHook.insert_rowsAPI. SinceDbApiHookis inherited broadly across providers, expanding the generic insert abstraction with provider-specific UPSERT semantics would increase API complexity and introduce ambiguous behavior for non-PostgreSQL hooks.Adding a dedicated
PostgresHook.upsert_rowsmethod keeps PostgreSQLON CONFLICTsemantics explicit and self-contained while avoiding backwards compatibility and abstraction concerns in the sharedDbApiHookinterface.The implementation uses PostgreSQL-native
INSERT ... ON CONFLICTsemantics rather thanMERGE, sinceON CONFLICTis the established and more broadly compatible UPSERT mechanism across supported PostgreSQL versions.Tests
Added unit tests verifying that:
ON CONFLICT DO UPDATESQL.DO NOTHINGbehavior is generated whenupdate_fieldsis omitted.fast_executemany=Trueusespsycopg2.extras.execute_batch.commit_everycorrectly chunks UPSERT operations across transactions.target_fieldsandconflict_fieldsraise validation errors.Backwards Compatibility
This change introduces a new provider-specific API and does not modify existing
insert_rowsbehavior or sharedDbApiHookinterfaces.